package com.derbysoft.nuke.kafka.manager.infrastructure.kafka;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.stereotype.Component;

import java.util.*;

/**
 * Created by Albert Fu on 18/05/2018.
 */

@Component
public class KafkaClientImpl implements KafkaClient {

    private Map<String, AdminClient> adminClients = Collections.synchronizedMap(new HashMap<>());
    private Map<String, KafkaAdminClient> kafkaAdminClients = Collections.synchronizedMap(new HashMap<>());

    @Override
    public AdminClient getAdminClient(String bootstrapServers) {
        if (!adminClients.containsKey(bootstrapServers)) {
            synchronized (this) {
                if (!adminClients.containsKey(bootstrapServers)) {
                    Map<String, Object> conf = new HashMap<>();
                    conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                    AdminClient adminClient = AdminClient.create(conf);
                    adminClients.put(bootstrapServers, adminClient);
                }
            }
        }
        return adminClients.get(bootstrapServers);
    }


    @Override
    public KafkaAdminClient getKafkaAdminClient(String bootstrapServers) {
        if (!kafkaAdminClients.containsKey(bootstrapServers)) {
            synchronized (this) {
                if (!kafkaAdminClients.containsKey(bootstrapServers)) {
                    kafkaAdminClients.put(bootstrapServers, new DefaultKafkaAdminClient(bootstrapServers));
                }
            }
        }
        return kafkaAdminClients.get(bootstrapServers);
    }
}
